package com.laosg.springboot.study.nsq.consumer.service.impl;

import cn.hutool.json.JSONUtil;
import com.laosg.springboot.study.nsq.consumer.model.NsqMessage;
import com.sproutsocial.nsq.Message;
import com.sproutsocial.nsq.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * Created by kaimin on 30/4/2019.
 * time : 11:03
 * 消息处理
 */
@Service
@Slf4j
public class ConsumerService implements MessageHandler {


    @Override
    public void accept(Message msg) {
        String massage = new String(msg.getData());
        NsqMessage nsqMessage = null;
        try {
            nsqMessage = JSONUtil.toBean(massage, NsqMessage.class);

        } catch (Exception e) {
            log.error("消息无法转换，存在问题");
            msg.finish();
        }
        if (!nsqMessage.getAction().equals("Channel1")) {
            // 如果nsq消息体中的action不等于当前的chanel名称,说明不是当前消费者需要处理的数据,确认消费即可
            msg.finish();
        }
        try {
            log.info("消费特定的消息:{}", nsqMessage.getBody());
            //确认消息
            msg.finish();
        } catch (Exception e) {
            //说明异常,重试下
            log.info("消息异常，重试以下！");
            msg.requeue();
        }
    }
}
